package cn._51doit.flink.day04;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 *
 */
public class ProjectDemo {

    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //辽宁省,沈阳市,3000
        //辽宁省,大连市,4000
        //辽宁省,鞍山市,4000
        //河北省,廊坊市,2000
        //河北省,邢台市,3000
        //河北省,石家庄市,2000
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //对数据进行映射
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> tpStream = lines.map(new MapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public Tuple3<String, String, Integer> map(String line) throws Exception {
                String[] fields = line.split(",");
                String province = fields[0];
                String city = fields[1];
                int money = Integer.parseInt(fields[2]);
                return Tuple3.of(province, city, money);
            }
        });

        //project：投影，只能指定tuple类型的DataStream进行操作，可以取出部分字段并且改变顺序
        //功能类似于map算子
        SingleOutputStreamOperator<Tuple> res = tpStream.project(0, 2);

        res.print();


        env.execute();



    }
}
